package com.atuguigu.flink.Day08;

import com.atuguigu.flink.Day01.Singlesensor.SensorReading;
import com.atuguigu.flink.Day01.Singlesensor.SensorSource;
import com.atuguigu.flink.sensor.SendsorReading;
import org.apache.flink.api.common.eventtime.SerializableTimestampAssigner;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.Tumble;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.types.Row;

import java.time.Duration;

import static org.apache.flink.table.api.Expressions.$;
import static org.apache.flink.table.api.Expressions.lit;

public class Example5 {
    // 事件时间窗口温度平均值
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);

        SingleOutputStreamOperator<SendsorReading> stream = env
                .addSource(new SensorSource())
                .filter(r -> r.id.equals("sensor_1"))
                .assignTimestampsAndWatermarks(
                        WatermarkStrategy.<SendsorReading>forBoundedOutOfOrderness(Duration.ofSeconds(0))
                                .withTimestampAssigner(new SerializableTimestampAssigner<SendsorReading>() {
                                    @Override
                                    public long extractTimestamp(SendsorReading element, long recordTimestamp) {
                                        return element.timestamp;
                                    }
                                })
                );

        EnvironmentSettings settings = EnvironmentSettings.newInstance().inStreamingMode().build();
        StreamTableEnvironment tEnv = StreamTableEnvironment.create(env, settings);

        //TODO 1 table api
        Table table = tEnv
                .fromDataStream(stream, $("id"), $("temperture").as("temp"),
                        $("timestamp").rowtime().as("ts"));

        Table tableResult = table
                .window(Tumble.over(lit(10).second()).on($("ts")).as("w"))
                .groupBy($("id"), $("w"))
                .select($("id"), $("temp").avg(), $("w").end());
        tEnv.toRetractStream(tableResult, Row.class);

        // TODO 2 SQL
        tEnv.createTemporaryView(
                "sensor",
                stream,
                $("id"),
                $("temperture").as("temp"),
                $("timestamp").rowtime().as("ts")
                );
        Table table1 = tEnv.sqlQuery("SELECT id,AVG(temp),TUMBLE_START(ts,INTERVAL '10' SECOND),TUMBLE_END(ts,INTERVAL '10' SECOND) FROM sensor GROUP BY id,TUMBLE(ts,INTERVAL '10' SECOND)");
        tEnv.toRetractStream(table1,Row.class).print();



        env.execute();

    }
}
